package org.springboot.kafka.config;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Future;

import javax.annotation.PostConstruct;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class KafkaOriginalConfig extends Thread {
	
	private static final Logger logger = LoggerFactory.getLogger(KafkaOriginalConfig.class);
	
	@Value("${spring.kafka.three.bootstrap-servers}")
    private String bootstrapServersThree;
    @Value("${spring.kafka.three.topic}")
    private String topicThree;
	
	private KafkaConsumer<String, String> consumer;
	
	public KafkaProducer<String, String> producer;
	
	@PostConstruct
	public void init() {
		super.setName("ApacheKafka");
		super.start();
	}
	
	@Override
	public void run() {
		while (true) {
			try {
				if (consumer != null)
					consumer.close();
				consumer = initConsumer();
				readMessages();
			} catch (Exception e) {
				e.printStackTrace();
				logger.error("未知异常", e);
				try {
					Thread.sleep(6 * 1000);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private void readMessages() throws Exception {
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
			for (ConsumerRecord<String, String> record : records) {
				logger.info(" kafka three 接收到消息：{}", record.topic() + ":" + record.value());
			}
			consumer.commitAsync();
		}
	}
	
	private KafkaConsumer<String, String> initConsumer() {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", bootstrapServersThree);
		props.setProperty("group.id", "UIS-RT-Consumer");
		props.setProperty("enable.auto.commit", "false");
		props.setProperty("auto.offset.reset", "earliest");
		// 每次拉取的最大条数限制
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
		props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		consumer.subscribe(Arrays.asList(topicThree.split(",")));
		return consumer;
	}
	
	
	private KafkaProducer<String, String> initProducer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", bootstrapServersThree);
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		return producer;
	}

	public void send(String sendTopic, String value) {
		if (producer == null)
			producer = initProducer();
		/**
		 * 发送原则：
		 * 1. partition 在写入的时候可以指定需要写入的 partition，如果有指定，则写入对应的 partition
		 * 2. 如果没有指定 partition，但是设置了数据的 key，则会根据 key 的值 hash 出一个 partition
		 * 3. 如果既没指定 partition，又没有设置 key，则会轮询选出一个 partition
		 * 
		 * 注意：如果往不存在的 topic 写数据，kafka 会自动创建 topic，分区和副本的数量根据默认配置都是 1
		 */
		ProducerRecord<String, String> record = new ProducerRecord<String, String>(sendTopic, value);
		Future<RecordMetadata> send = producer.send(record, new Callback() {
		    @Override
		    public void onCompletion(RecordMetadata metadata, Exception exception) {
		    	logger.info("kafkaThree 消息发送通知, topic:{}, partition:{}, offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
		    }
		});
		if (send != null) {
			try {
				// 阻塞等待消息发送完成
				send.get();
			} catch (Exception e) {
				logger.error("kafka消息 {} 发送异常！ topic: {}", value, sendTopic, e);
				if (producer != null) {
					producer.close();
				}
				// 不知道这个错误怎么处理，先抛出去吧
				throw new RuntimeException(e);
			}
        }
	}
}
